-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[paimon-flink-cdc] Add the latest_schema state at schema evolution operator ,Reduce the latest schema access frequency #4535
Conversation
…latest schema access frequency
|
||
private final SchemaManager schemaManager; | ||
|
||
private final Identifier identifier; | ||
|
||
private ListState<DataField> latestSchemaListState; | ||
private List<DataField> latestSchemaList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latestSchemaListState cannot be set to final because it cannot be initialized in the constructor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean latestSchemaList 'latestSchemaList' may be 'final', not latestSchemaListState.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok,thanks
actualUpdatedDataFields.forEach(field -> latestSchemaList.add(field)); | ||
} | ||
|
||
@Override |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please add a ut
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added: SchemaEvolutionTest
latestSchemaListState.get().forEach(dataField -> latestSchemaList.add(dataField)); | ||
} else { | ||
RowType oldRowType = schemaManager.latest().get().logicalRowType(); | ||
oldRowType.getFields().forEach(dataField -> latestSchemaList.add(dataField)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latestSchemaList.addAll(oldRowType.getFields());
applySchemaChange(schemaManager, schemaChange, identifier); | ||
} | ||
actualUpdatedDataFields.forEach(field -> latestSchemaList.add(field)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latestSchemaList.addAll(actualUpdatedDataFields);
new ListStateDescriptor<>( | ||
"latest-schema-list-state", DataField.class)); | ||
if (context.isRestored()) { | ||
latestSchemaListState.get().forEach(dataField -> latestSchemaList.add(dataField)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
latestSchemaListState.get().forEach(latestSchemaList::add);
Hi @GangYang-HX , do we really need to put the schema to the state? Maybe just a field in memory is OK? |
Well, I have considered it before. If the state is not added when restoring from the state, there will be one more latest-schame access. |
@@ -37,25 +44,54 @@ | |||
* be 1. | |||
*/ | |||
public class UpdatedDataFieldsProcessFunction | |||
extends UpdatedDataFieldsProcessFunctionBase<List<DataField>, Void> { | |||
extends UpdatedDataFieldsProcessFunctionBase<List<DataField>, Void> | |||
implements CheckpointedFunction { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It doesn't need to be CheckpointedFunction
|
||
private final SchemaManager schemaManager; | ||
|
||
private final Identifier identifier; | ||
|
||
private final List<DataField> latestSchemaList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@nullable
private List latestSchemaList
} | ||
|
||
private boolean dataFieldContainIgnoreId(DataField dataField) { | ||
return latestSchemaList.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (latestSchemaList == null) {
RowType rowType = schemaManager.latest().get().logicalRowType();
latestSchemaList.addAll(rowType.getFields());
}
|
||
private final SchemaManager schemaManager; | ||
|
||
private final Identifier identifier; | ||
|
||
private final List<DataField> latestSchemaList; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe it can be:
Set<FieldIdentifier> latestFields;
FieldIdentifier {
String name;
DataType type;
String description;
}
…stFields in the open method
* non-SchemaChange.AddColumn scenario. Otherwise, the previously existing fields cannot be | ||
* modified again. | ||
*/ | ||
updateLatestFields(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we just add updatedDataFields to latestFields?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this is the logic in previous versions, but there are risks in actual testing.
Reason: FieldIdentifier only guarantees the uniqueness of <name, type, description>. If any of the attributes is adjusted repeatedly, it will lead to misjudgment.
For example: Field A's type is int, and then it is changed to string. If you want to adjust it to int again, it will not work,because latestFields has already saved an element like <A, int, description>.
+1 |
Purpose
Linked issue: Issue-4521
In scenarios where the number of Paimon table fields is large and the Write concurrency is high, reduce the Latest-Schema access frequency to improve the throughput of job cold start
Tests
Case-1: Observe whether the checkpoint time of schema evolution changes
Conclusion: After optimization, Schema Evolution is basically completed in seconds, or even milliseconds.
Case-2: Observe the log to see if there are still a large number of read schema behaviors
Conclusion: From hundreds of thousands to 115 times
API and Format
org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunction#processElement
Documentation
Before the Schema Evolution operator calls org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunctionBase#extractSchemaChanges, add a judgment to confirm whether the field update really needs to be triggered.